package main

import (
	"context"
	"fmt"
	"gitee.com/zackeus/go-boot/rocketmq"
	"gitee.com/zackeus/go-boot/rocketmq/consumer"
	"gitee.com/zackeus/go-boot/rocketmq/primitive"
	"gitee.com/zackeus/go-boot/rocketmq/rlog"
	"gitee.com/zackeus/go-zero/core/logx"
	"os"
	"os/signal"
	"syscall"
)

//const (
//	Topic         = "test-topic"
//	ConsumerGroup = "test-group"
//	Endpoint      = "192.168.137.31:9876"
//	AccessKey     = "rmq"
//	SecretKey     = "baicaf5678"
//)

const (
	Topic         = "test-topic"
	ConsumerGroup = "test-group"
	Endpoint      = "10.5.54.141:9876"
	AccessKey     = "testuser"
	SecretKey     = "yulon123"
)

func main() {
	if err := logx.SetUp(logx.LogConf{Mode: "console", Encoding: "plain"}); err != nil {
		fmt.Println(err)
		return
	}
	rlog.SetLevel("error")

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGTERM, syscall.SIGINT)
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName(ConsumerGroup),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{Endpoint})),
		consumer.WithConsumeGoroutineNums(20),
		consumer.WithConsumeMessageBatchMaxSize(1),
		/* 最大重试次数 超过次数会送入死信队列 */
		consumer.WithMaxReconsumeTimes(3),
		/* 消费 TPS */
		consumer.WithConsumeTPS(1000),
		/* 鉴权消息 */
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: AccessKey,
			SecretKey: SecretKey,
		}),
	)

	//selector := consumer.MessageSelector{
	//	Type:       consumer.SQL92,
	//	Expression: "(TAGS is not null and TAGS in ('cdr')) and business_access_code = 'test1'",
	//}
	//if err := c.Subscribe(Topic, selector, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	//	for i := range ext {
	//		logx.Info("test1: ", string(ext[i].Body))
	//	}
	//	return consumer.ConsumeSuccess, nil
	//}); err != nil {
	//	fmt.Println(err.Error())
	//}

	//if err := c.Subscribe(Topic, consumer.MessageSelector{
	//	Type:       consumer.TAG,
	//	Expression: strings.Join([]string{"cdr"}, " || "),
	//}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	//	for i := range ext {
	//		logx.Info("recv: ", string(ext[i].Body))
	//	}
	//	return consumer.ConsumeRetryLater, nil
	//}); err != nil {
	//	fmt.Println(err.Error())
	//}

	if err := c.Subscribe(Topic, consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for i := range ext {
			logx.Info("recv: ", string(ext[i].Body))
		}
		return consumer.ConsumeSuccess, nil
	}); err != nil {
		fmt.Println(err.Error())
	}

	// Note: start after subscribe
	if err := c.Start(); err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}

	<-signals
	if err := c.Shutdown(); err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}
